package Flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;

public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> text = env.fromElements(
                "lei 1584184148", "lei 1584184141", "lei 1584184142",
                "lei 1584184143", "lei 1584184144", "lei 1584184145",
                "lei 1584184146", "lei 1584184147", "lei 1584184140");
        DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
            private static final long serialVersionUID = -8812094804806854937L;

            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                return new Tuple2<>(value.split("\\W+")[0], 1000 * Long.valueOf(value.split("\\W+")[1]));
            }
        });
        DataStream<Tuple2<String, Long>> watermark =
                inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {

                    private static final long serialVersionUID = 8252616297345284790L;
                    Long currentMaxTimestamp = 0L;
                    Long maxOutOfOrderness = 1000L;//最大允许的乱序时间
                    Watermark watermark = null;
                    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                        return watermark;
                    }

                    @Override
                    public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                        Long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        System.out.println("timestamp : " + element.f1 + "|" + format.format(element.f1) + " currentMaxTimestamp : " + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp));
                        return timestamp;
                    }
                });
        watermark.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))
                .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    private static final long serialVersionUID = 7813420265419629362L;

                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        System.out.println("apply : " + input);
                        out.collect("input  " + input + "  window  "+ format.format(window.getStart()) + "   window  " + format.format(window.getEnd()));
                    }
                }).addSink(new RichSinkFunction<String>() {
                                               @Override
                                               public void invoke(String value, Context context) throws Exception {
                                                   System.out.println("RichSinkFunction : " + value);
                                               }
                                           });

                env.execute("window test");
    }
}